package com.example.socket.executor;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * @author frank
 */
public class NettySingleThreadEventExecutor extends AbstractEventExecutor {
    private static final Logger logger = LoggerFactory.getLogger(NettySingleThreadEventExecutor.class);

    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTDOWN = 3;
    private static final int ST_TERMINATE = 4;

    private volatile int state = ST_NOT_STARTED;
    private Disruptor<ThreadWork> disruptor;
    private RingBuffer<ThreadWork> ringBuffer;
    private SingleThreadExecutor executor;
    private EventExecutorGroup parent;
    private RingeventWorkHandler handler;

    public NettySingleThreadEventExecutor(EventExecutorGroup parent,
                                          ThreadFactory threadFactory, int bufferSize) {
        super();
        this.parent = parent;
        executor = new SingleThreadExecutor(threadFactory);
        disruptor = new Disruptor<>(FACTORY, bufferSize, executor,
                ProducerType.SINGLE, new SleepingWaitStrategy());
        handler = new RingeventWorkHandler();
        disruptor.handleEventsWith(handler);
        ringBuffer = disruptor.start();
        state = ST_STARTED;
    }

    @Override
    public EventExecutorGroup parent() {
        return parent;
    }

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == executor.getThread();
    }

    @Override
    public boolean isShuttingDown() {
        return isShutdown();
    }

    @Override
    public io.netty.util.concurrent.Future<?> shutdownGracefully(
            long quietPeriod, long timeout, TimeUnit unit) {
        if (quietPeriod < 0) {
            throw new IllegalArgumentException("quietPeriod: " + quietPeriod
                    + " (expected >= 0)");
        }
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException("timeout: " + timeout
                    + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        if (isShuttingDown()) {
            return terminationFuture();
        }
        shutdown(timeout, unit);
        return terminationFuture();
    }

    @Override
    public io.netty.util.concurrent.Future<?> terminationFuture() {
        return executor.getTerminationFuture();
    }

    @Override
    public boolean isShutdown() {
        return state >= ST_SHUTDOWN;
    }

    @Override
    public boolean isTerminated() {
        return state >= ST_TERMINATE;
    }

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        if (inEventLoop()) {
            throw new IllegalStateException("cannot await termination of the current thread");
        }
        if (isTerminated()) {
            return true;
        }
        shutdown(timeout, unit);
        state = ST_TERMINATE;
        return true;
    }

    @Override
    public void execute(Runnable command) {
        if (state != ST_STARTED) {
            reject(command);
        }
        ringBuffer.publishEvent(TRANSLATOR, command);
    }

    protected static void reject(Runnable command) {
        throw new RejectedExecutionException("event executor terminated : " + command);
    }

    @Override
    public void shutdown() {
        if (isShutdown()) {
            return;
        }
        shutdown(-1, TimeUnit.MILLISECONDS);
    }

    private void shutdown(long timeout, TimeUnit unit) {
        if (isShuttingDown()) {
            return;
        }
        state = ST_SHUTDOWN;
        for (; ; ) {
            if (ringBuffer.remainingCapacity() == ringBuffer.getBufferSize()) {
                break;
            }
            Thread.yield();
        }
        try {
            disruptor.shutdown(timeout, unit);
        } catch (Exception e) {
            logger.error("Exception during onShutdown()", e);
        }
    }

    private final static class ThreadWork {
        private Runnable runner;
    }

    public final static EventFactory<ThreadWork> FACTORY = new EventFactory<ThreadWork>() {
        @Override
        public ThreadWork newInstance() {
            return new ThreadWork();
        }
    };

    public static final EventTranslatorOneArg<ThreadWork, Runnable> TRANSLATOR = new EventTranslatorOneArg<ThreadWork, Runnable>() {
        @Override
        public void translateTo(ThreadWork event, long sequence, Runnable arg0) {
            event.runner = arg0;
        }
    };

    private class RingeventWorkHandler implements EventHandler<ThreadWork> {
        @Override
        public void onEvent(ThreadWork event, long sequence, boolean endOfBatch)
                throws Exception {
            event.runner.run();
            event.runner = null;
        }
    }
}